package com.ferry.lock.engine.zookeeper.menagerie.locks;

import com.ferry.lock.engine.zookeeper.menagerie.ZKSessionManager;
import com.ferry.lock.engine.zookeeper.menagerie.ZKUtils;
import com.ferry.lock.engine.zookeeper.menagerie.ZkPrimitive;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;

import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;

/**
 * Created by daiyong
 */
public class ZKCondition extends ZkPrimitive implements Condition {

	private static final String conditionPrefix = "condition";
	private static final char conditionDelimiter = '-';
	private final ReentrantZKLock distributedLock;

	public ZKCondition(String baseNode, ZKSessionManager zkSessionManager, List<ACL> privileges, ReentrantZKLock lock) {
		super(baseNode, zkSessionManager, privileges);
		distributedLock = lock;
	}

	@Override
	public void await() throws InterruptedException {
		await(Long.MAX_VALUE,TimeUnit.DAYS);
	}

	@Override
	public void awaitUninterruptibly() {
		if (!distributedLock.hasLock()) {
			throw new IllegalMonitorStateException("await was called without owning the associated lock");
		}

		try {
			ZooKeeper zk = zkSessionManager.getZooKeeper();
			String conditionName = zk.create(baseNode + "/" + conditionPrefix + conditionDelimiter, emptyNode, privileges, CreateMode.EPHEMERAL_SEQUENTIAL);

			distributedLock.unLock();

			while (true) {
				localLock.lock();
				try {
					if (zk.exists(conditionName, signalWatcher) == null) {
						distributedLock.lock();
						return;
					} else {
						condition.awaitUninterruptibly();
					}
				} finally {
					localLock.unlock();
				}
			}
		} catch (KeeperException e) {
			throw new RuntimeException(e);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public long awaitNanos(long nanosTimeout) throws InterruptedException {
		if(Thread.interrupted())
			throw new InterruptedException();
		if(!distributedLock.hasLock())
			throw new IllegalMonitorStateException("await was called without owning the associated lock");
		try {
			//release the associated zkLock
			distributedLock.unLock();

			//put a signal node onto zooKeeper, then wait for it to be deleted
			ZooKeeper zooKeeper = zkSessionManager.getZooKeeper();
			String conditionName = zooKeeper.create(baseNode + "/" + conditionPrefix + conditionDelimiter,
					emptyNode, privileges, CreateMode.EPHEMERAL_SEQUENTIAL);

			long timeLeft = nanosTimeout;
			while(true){
				if(Thread.interrupted()){
					zooKeeper.delete(conditionName,-1);
					throw new InterruptedException();
				}
				if(checkTimeout(zooKeeper, conditionName, timeLeft)) return timeLeft;

				long start = System.nanoTime();
				localLock.lock();
				try{
					long endTime = System.nanoTime();
					timeLeft -= (endTime-start);
					if(checkTimeout(zooKeeper, conditionName, timeLeft)) return timeLeft;
					else if(zooKeeper.exists(conditionName,signalWatcher)==null){
						//we have been signalled, so relock and then return
						return timeLeft;
					}else{
						timeLeft = condition.awaitNanos(timeLeft);
					}
				}finally{
					localLock.unlock();
				}
			}
		} catch (KeeperException e) {
			throw new RuntimeException(e);
		} finally{
			distributedLock.lock();
		}
	}

	@Override
	public boolean await(long time, TimeUnit unit) throws InterruptedException {
		return awaitNanos(unit.toNanos(time))>0;
	}

	@Override
	public boolean awaitUntil(Date deadline) throws InterruptedException {
		if(!distributedLock.hasLock())
			throw new IllegalMonitorStateException("await is attempted without first owning the associated lock");

		long timeToWait = deadline.getTime()-System.currentTimeMillis();
		return timeToWait > 0 && await(timeToWait, TimeUnit.MILLISECONDS);
	}

	/**
	 * Chooses a party which is waiting on this lock, and signals it.
	 * <p>
	 * Note:This implementation does NOT check the interrupted status of the current thread, as it prioritizes that
	 * a signal ALWAYS be sent, if there is a party waiting for that signal.
	 *
	 * @see #signal() in java.util.concurrent.locks.Condition
	 */
	@Override
	public void signal() {
		if(!distributedLock.hasLock())
			throw new IllegalMonitorStateException("Signal is attempted without first owning the signalling lock");

		ZooKeeper zooKeeper = zkSessionManager.getZooKeeper();
		try {
			List<String> conditionsToSignal = ZKUtils.filterByPrefix(zooKeeper.getChildren(baseNode, false), conditionPrefix);
			if(conditionsToSignal.size()<=0)return; //no parties to signal

			//delete the lowest numbered waiting party
			ZKUtils.sortBySequence(conditionsToSignal,conditionDelimiter);
			zooKeeper.delete(baseNode+"/"+conditionsToSignal.get(0),-1);

		} catch (KeeperException e) {
			throw new RuntimeException(e);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public void signalAll() {
		if(!distributedLock.hasLock())
			throw new IllegalMonitorStateException("Signal is attempted without first owning the signalling lock");

		ZooKeeper zooKeeper = zkSessionManager.getZooKeeper();
		try {
			List<String> conditionsToSignal = ZKUtils.filterByPrefix(zooKeeper.getChildren(baseNode, false), conditionPrefix);
			if(conditionsToSignal.size()<=0)return; //no parties to signal

			//notify all waiting conditions in sequence
			ZKUtils.sortBySequence(conditionsToSignal,conditionDelimiter);

			for(String condition:conditionsToSignal){
				zooKeeper.delete(baseNode+"/"+condition,-1);
			}
		} catch (KeeperException e) {
			throw new RuntimeException(e);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
	}

	private boolean checkTimeout(ZooKeeper zooKeeper, String nodeName, long timeLeft)
			throws InterruptedException, KeeperException {
		if(timeLeft<=0){
			//timed out
			zooKeeper.delete(nodeName,-1);
			return true;
		}
		return false;
	}
}
